/*
 * Copyright (C) Lightbend Inc. <https://www.lightbend.com>
 */

package com.lightbend.lagom.javadsl.persistence;

import akka.Done;
import akka.event.Logging;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;
import org.pcollections.PSequence;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
 * A read side processor.
 *
 * <p>Read side processors consume events produced by {@link
 * com.lightbend.lagom.javadsl.persistence.PersistentEntity} instances, and update some read side
 * data store that is optimized for queries.
 *
 * <p>The events they consume must be tagged, and a read side is able to consume events of one or
 * more tags. Events are usually tagged according to some supertype of event, for example, events
 * may be tagged as <code>Order</code> events. They may also be tagged according to a hash of the ID
 * of the entity associated with the event - this allows read side event handling to be sharded
 * across many nodes. Tagging is done using {@link
 * com.lightbend.lagom.javadsl.persistence.AggregateEventTag}.
 *
 * <p>Read side processors are responsible for tracking what events they have already seen. This is
 * done using offsets, which are sequential values associated with each event. Note that end users
 * typically will not need to handle offsets themselves, this will be provided by Lagom support
 * specific to the read side datastore, and end users can just focus on handling the events
 * themselves.
 */
public abstract class ReadSideProcessor<Event extends AggregateEvent<Event>> {

  /**
   * Return a {@link ReadSideHandler} for the given offset type.
   *
   * @return The offset processor.
   */
  public abstract ReadSideHandler<Event> buildHandler();

  /**
   * An read side offset processor.
   *
   * <p>This is responsible for the actual read side handling, including handling offsets and the
   * events themselves.
   */
  public abstract static class ReadSideHandler<Event extends AggregateEvent<Event>> {

    /**
     * Prepare the database for all processors.
     *
     * <p>This will be invoked at system startup. It is guaranteed to only be invoked once at a time
     * across the entire cluster, and so is safe to be used to perform actions like creating tables,
     * that could cause problems if done from multiple nodes.
     *
     * <p>It will be invoked again if it fails, and it may be invoked multiple times as nodes of the
     * cluster go up or down. Unless the entire system is restarted, there is no way to guarantee
     * that it will be invoked at a particular time - in particular, it should not be used for doing
     * upgrades unless the entire system is restarted and a new cluster built from scratch.
     *
     * @return A completion stage that is redeemed when preparation is finished.
     */
    public CompletionStage<Done> globalPrepare() {
      return CompletableFuture.completedFuture(Done.getInstance());
    }

    /**
     * Prepare this processor.
     *
     * <p>The primary purpose of this method is to load the last offset that was processed, so that
     * read side processing can continue from that offset.
     *
     * <p>This also provides an opportunity for processors to do any initialisation activities, such
     * as creating or updating database tables, or migrating data.
     *
     * <p>This will be invoked at least once for each tag, and may be invoked multiple times, such
     * as in the event of failure.
     *
     * @param tag The tag to get the offset for.
     * @return A completion stage that is redeemed when preparation is finished.
     */
    public CompletionStage<Offset> prepare(AggregateEventTag<Event> tag) {
      return CompletableFuture.completedFuture(Offset.NONE);
    }

    /**
     * Flow to handle the events.
     *
     * <p>If the handler does any blocking, this flow should be configured to use a dispatcher that
     * is configured to allow for that blocking.
     */
    public abstract Flow<Pair<Event, Offset>, Done, ?> handle();
  }

  /**
   * The tags to aggregate.
   *
   * <p>This must return at least one tag to aggregate. Read side processors will be sharded over
   * the cluster by these tags, so if events are tagged by a shard key, the read side processing
   * load can be distributed across the cluster.
   *
   * @return The tags to aggregate.
   */
  public abstract PSequence<AggregateEventTag<Event>> aggregateTags();

  /**
   * The name of this read side.
   *
   * <p>This name should be unique among the read sides and entity types of the service. By default
   * it is using the short class name of the concrete `ReadSideProcessor` class. Subclasses may
   * override to define other type names. It is wise to override and retain the original name when
   * the class name is changed because this name is used to identify read sides throughout the
   * cluster.
   */
  public String readSideName() {
    return Logging.simpleName(getClass());
  }
}
